blog-banner

8 ways to use CDC Queries for more powerful data streaming

Last edited on June 1, 2023

0 minute read

    CDC Queries are SQL-like statements that allow you to (1) filter (2) transform (3) and choose the schema of your data stream. Instead of the headaches of sidecar services or downstream tools, filtering and transforming streaming data is now as simple as a SQL query. Here is a quick video tutorial about how to use CDC Queries that accounts for all the new functionality as of the CockroachDB 23.1 release:

    What are changefeeds?Copy Icon

    Changefeeds allow you to stream continuous updates from a table or a one-time snapshot of the table to a SQL connection or external system. They’re native to CockroachDB, meaning you don’t need to spend time and money setting up a third party change data capture system. Changefeeds have been used to fulfill a wide range of goals from keeping inventory management systems up to date in real time, to replicating data in data warehouses. You can build a data pipeline for business analytics, event-driven architectures, data archival, audit logging, migrations, and more.


    RELATED

    Idempotency and ordering in event-driven systems


    CDC Queries make it even easier and less expensive to operate changefeeds. If you know SQL, you know how to create with CDC Queries. The basic format looks like this:

    CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...] AS SELECT [columns or expressions] FROM t WHERE [functions];

    With these primitives in hand, we can easily and elegantly create interesting and complex streaming behaviors. Here are 8 things you can do with them.

    Note: CDC Queries are currently in preview, with a planned GA in our 23.1 release. CDC Queries UX and capabilities are likely to evolve. Check out our documentation for up to date limitations and details.

    8 CDC Transformation Use CasesCopy Icon

    1. Only send INSERTS & UPDATES for your outbox tableCopy Icon

    Every message in an outbox table is meant to be streamed and then deleted. In this case, you double the messages you have to send by sending the deletes! CDC Queries allow you to filter out delete messages from the stream.

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH resolved, schema_change_policy=stop AS SELECT * FROM outbox WHERE NOT cdc_is_delete();

    2. Remove the need for an outbox table Copy Icon

    Outbox tables allow you to create an event stream with custom fields and structure. CDC Queries can replace the need to store this information in an outbox table. Instead, you can set the metadata and structure in the changefeed.

    CREATE CHANGEFEED INTO ‘kafka://[endpoint]?topic_name=events’ AS SELECT cdc_updated_timestamp()::int AS event_timestamp, ‘users’ AS table, IF(cdc_is_delete(),’delete’,’create’) AS type, jsonb_build_object(‘email’,email, ‘admin’, admin) AS payload FROM users;

    In this example, our message will include metadata (the timestamp for the event, the name of the table, and whether this event was a delete or an update) and a payload column with the event data we want in JSON format.

    3. Chose which table columns will be included in your streamCopy Icon

    Often you only need a subset of columns from a table for your streaming use case. By filtering unneeded columns with CDC Queries, you only get messages when the columns you care about have been updated. Message sizes can also be greatly reduced. Filtering columns can be a great way to reduce Total Cost of Ownership (TCO) of your streaming pipeline by reducing the required allocation of compute and storage resources downstream.

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH schema_change_policy=stop AS SELECT column1, column2 FROM table;

    4. Emit only the events that made changes you care about Copy Icon

    CDC Queries can filter out all updates that don’t modify the column we care about.

    For example, you run a gaming platform, where each game’s meta information is represented in a single row, with a status column to indicate when the game goes from “in progress” to “finished” state. You might have a microservice that manages the leaderboard calculation for your gaming platform. That microservice only cares about a game where the game state is “finished”. We can filter updates that do not change the game state.

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH schema_change_policy=stop, diff AS SELECT *, cdc_prev() AS previous FROM important_table WHERE important_column != cdc_prev()->'important_column';

    5. Emit only events that match a certain criteriaCopy Icon

    CDC Queries let you filter a row out if the contents of the row do not match a certain criteria. For example, you may have a user or application column that tracks what user or application modified that row. You can create a changefeed that only serves messages modified by a particular user or application. Alternatively you may want to set a criteria to match on, such as age > 18.

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH schema_change_policy=stop AS SELECT * FROM table WHERE application='prod_microservice';

    6. Emit only events from a certain regionCopy Icon

    In CockroachDB, you can set data-placement locality in a table at a row-level. CDC Queries allow you to choose rows from a region or set of regions from your table to include in your changefeed. This can prevent costly cross-regional data transfer costs or help ensure data fencing.

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH schema_change_policy=stop AS SELECT * FROM table WHERE region='US/east';

    7. Ensure a stable message schema, event with schema changesCopy Icon

    CockroachDB allows online changes to your table’s schema that prevent application downtime. However, this can negatively impact streaming pipelines, because schema changes such as adding a new column or changing a column’s type can change the schema of your changefeed message as well. CDC Queries let you set the columns and data type for your changefeed message, stabilizing the schema in the case of a schema change.

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH schema_change_policy=stop AS SELECT id::string, name::string, admin::string FROM users_table;

    8. Load balance changefeed messages across feedsCopy Icon

    CDC Queries can be used to shard changefeeds, as a means to load balance. For example, you could create several changefeeds, each taking on ⅛ of the unique rows for the table:

    CREATE CHANGEFEED INTO 'kafka://[endpoint]' WITH schema_change_policy=stop AS SELECT * FROM table WHERE id % 8 = 0;

    Video demo of CDC Queries at workCopy Icon

    In this video demonstration of CDC Queries I show three different use cases (Note: the name of our CDC Queries feature was “CDC Transformations” at the time of recording). If you have questions about those use cases you can comment on the video or reach out to us on our community slack.

    Developer
    CDC